// Copyright Epic Games, Inc. All Rights Reserved.

#include "zen.h"

#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
#include <zencore/compactbinaryvalidation.h>
#include <zencore/fmtutils.h>
#include <zencore/session.h>
#include <zencore/stream.h>
#include <zenhttp/formatters.h>
#include <zenhttp/httpcommon.h>
#include <zenutil/packageformat.h>

#include <zenstore/cache/structuredcachestore.h>
#include "diag/logging.h"

ZEN_THIRD_PARTY_INCLUDES_START
#include <cpr/cpr.h>
ZEN_THIRD_PARTY_INCLUDES_END

#include <xxhash.h>
#include <gsl/gsl-lite.hpp>

namespace zen {

namespace detail {
	struct ZenCacheSessionState
	{
		ZenCacheSessionState(ZenStructuredCacheClient& Client) : OwnerClient(Client) {}
		~ZenCacheSessionState() {}

		void Reset(std::chrono::milliseconds ConnectTimeout, std::chrono::milliseconds Timeout)
		{
			Session.SetBody({});
			Session.SetHeader({});
			Session.SetConnectTimeout(ConnectTimeout);
			Session.SetTimeout(Timeout);
		}

		cpr::Session& GetSession() { return Session; }

	private:
		ZenStructuredCacheClient& OwnerClient;
		cpr::Session			  Session;
	};

}  // namespace detail

//////////////////////////////////////////////////////////////////////////

ZenStructuredCacheClient::ZenStructuredCacheClient(const ZenStructuredCacheClientOptions& Options)
: m_Log(logging::Get(std::string_view("zenclient")))
, m_ServiceUrl(Options.Url)
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
{
}

ZenStructuredCacheClient::~ZenStructuredCacheClient()
{
	RwLock::ExclusiveLockScope _(m_SessionStateLock);
	for (auto& CacheEntry : m_SessionStateCache)
	{
		delete CacheEntry;
	}
}

detail::ZenCacheSessionState*
ZenStructuredCacheClient::AllocSessionState()
{
	detail::ZenCacheSessionState* State = nullptr;

	if (RwLock::ExclusiveLockScope _(m_SessionStateLock); !m_SessionStateCache.empty())
	{
		State = m_SessionStateCache.front();
		m_SessionStateCache.pop_front();
	}

	if (State == nullptr)
	{
		State = new detail::ZenCacheSessionState(*this);
	}

	State->Reset(m_ConnectTimeout, m_Timeout);

	return State;
}

void
ZenStructuredCacheClient::FreeSessionState(detail::ZenCacheSessionState* State)
{
	RwLock::ExclusiveLockScope _(m_SessionStateLock);
	m_SessionStateCache.push_front(State);
}

//////////////////////////////////////////////////////////////////////////

using namespace std::literals;

ZenStructuredCacheSession::ZenStructuredCacheSession(Ref<ZenStructuredCacheClient>&& OuterClient)
: m_Log(OuterClient->Log())
, m_Client(std::move(OuterClient))
{
	m_SessionState = m_Client->AllocSessionState();
}

ZenStructuredCacheSession::~ZenStructuredCacheSession()
{
	m_Client->FreeSessionState(m_SessionState);
}

ZenCacheResult
ZenStructuredCacheSession::CheckHealth()
{
	ExtendableStringBuilder<256> Uri;
	Uri << m_Client->ServiceUrl() << "/health/check";

	cpr::Session& Session = m_SessionState->GetSession();
	Session.SetOption(cpr::Url{Uri.c_str()});
	cpr::Response Response = Session.Get();

	if (Response.error)
	{
		return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
	}

	return {.Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Response.status_code == 200};
}

ZenCacheResult
ZenStructuredCacheSession::GetCacheRecord(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, ZenContentType Type)
{
	ExtendableStringBuilder<256> Uri;
	Uri << m_Client->ServiceUrl() << "/z$/";
	if (Namespace != ZenCacheStore::DefaultNamespace)
	{
		Uri << Namespace << "/";
	}
	Uri << BucketId << "/" << Key.ToHexString();

	cpr::Session& Session = m_SessionState->GetSession();

	Session.SetOption(cpr::Url{Uri.c_str()});
	Session.SetHeader(cpr::Header{{"Accept", std::string{MapContentTypeToString(Type)}}});
	cpr::Response Response = Session.Get();
	ZEN_DEBUG("GET {}", Response);

	if (Response.error)
	{
		return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
	}

	const bool	   Success = Response.status_code == 200;
	const IoBuffer Buffer  = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();

	return {.Response = Buffer, .Bytes = Response.downloaded_bytes, .ElapsedSeconds = Response.elapsed, .Success = Success};
}

ZenCacheResult
ZenStructuredCacheSession::GetCacheChunk(std::string_view Namespace,
										 std::string_view BucketId,
										 const IoHash&	  Key,
										 const IoHash&	  ValueContentId)
{
	ExtendableStringBuilder<256> Uri;
	Uri << m_Client->ServiceUrl() << "/z$/";
	if (Namespace != ZenCacheStore::DefaultNamespace)
	{
		Uri << Namespace << "/";
	}
	Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();

	cpr::Session& Session = m_SessionState->GetSession();

	Session.SetOption(cpr::Url{Uri.c_str()});
	Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}});

	cpr::Response Response = Session.Get();
	ZEN_DEBUG("GET {}", Response);

	if (Response.error)
	{
		return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
	}

	const bool	   Success = Response.status_code == 200;
	const IoBuffer Buffer  = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();

	return {.Response		= Buffer,
			.Bytes			= Response.downloaded_bytes,
			.ElapsedSeconds = Response.elapsed,
			.Reason			= Response.reason,
			.Success		= Success};
}

ZenCacheResult
ZenStructuredCacheSession::PutCacheRecord(std::string_view Namespace,
										  std::string_view BucketId,
										  const IoHash&	   Key,
										  IoBuffer		   Value,
										  ZenContentType   Type)
{
	ExtendableStringBuilder<256> Uri;
	Uri << m_Client->ServiceUrl() << "/z$/";
	if (Namespace != ZenCacheStore::DefaultNamespace)
	{
		Uri << Namespace << "/";
	}
	Uri << BucketId << "/" << Key.ToHexString();

	cpr::Session& Session = m_SessionState->GetSession();

	Session.SetOption(cpr::Url{Uri.c_str()});
	Session.SetHeader(cpr::Header{{"Content-Type",
								   Type == ZenContentType::kCbPackage  ? "application/x-ue-cbpkg"
								   : Type == ZenContentType::kCbObject ? "application/x-ue-cb"
																	   : "application/octet-stream"}});
	Session.SetBody(cpr::Body{static_cast<const char*>(Value.Data()), Value.Size()});

	cpr::Response Response = Session.Put();
	ZEN_DEBUG("PUT {}", Response);

	if (Response.error)
	{
		return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
	}

	const bool Success = Response.status_code == 200 || Response.status_code == 201;
	return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
}

ZenCacheResult
ZenStructuredCacheSession::PutCacheValue(std::string_view Namespace,
										 std::string_view BucketId,
										 const IoHash&	  Key,
										 const IoHash&	  ValueContentId,
										 IoBuffer		  Payload)
{
	ExtendableStringBuilder<256> Uri;
	Uri << m_Client->ServiceUrl() << "/z$/";
	if (Namespace != ZenCacheStore::DefaultNamespace)
	{
		Uri << Namespace << "/";
	}
	Uri << BucketId << "/" << Key.ToHexString() << "/" << ValueContentId.ToHexString();

	cpr::Session& Session = m_SessionState->GetSession();

	Session.SetOption(cpr::Url{Uri.c_str()});
	Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-comp"}});
	Session.SetBody(cpr::Body{static_cast<const char*>(Payload.Data()), Payload.Size()});

	cpr::Response Response = Session.Put();
	ZEN_DEBUG("PUT {}", Response);

	if (Response.error)
	{
		return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
	}

	const bool Success = Response.status_code == 200 || Response.status_code == 201;
	return {.Bytes = Response.uploaded_bytes, .ElapsedSeconds = Response.elapsed, .Reason = Response.reason, .Success = Success};
}

ZenCacheResult
ZenStructuredCacheSession::InvokeRpc(const CbObjectView& Request)
{
	ExtendableStringBuilder<256> Uri;
	Uri << m_Client->ServiceUrl() << "/z$/$rpc";

	BinaryWriter Body;
	Request.CopyTo(Body);

	cpr::Session& Session = m_SessionState->GetSession();

	Session.SetOption(cpr::Url{Uri.c_str()});
	Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cb"}, {"Accept", "application/x-ue-cbpkg"}});
	Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Body.GetData()), Body.GetSize()});

	cpr::Response Response = Session.Post();
	ZEN_DEBUG("POST {}", Response);

	if (Response.error)
	{
		return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
	}

	const bool	   Success = Response.status_code == 200;
	const IoBuffer Buffer  = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();

	return {.Response		= std::move(Buffer),
			.Bytes			= Response.uploaded_bytes,
			.ElapsedSeconds = Response.elapsed,
			.Reason			= Response.reason,
			.Success		= Success};
}

ZenCacheResult
ZenStructuredCacheSession::InvokeRpc(const CbPackage& Request)
{
	ExtendableStringBuilder<256> Uri;
	Uri << m_Client->ServiceUrl() << "/z$/$rpc";

	SharedBuffer Message = FormatPackageMessageBuffer(Request).Flatten();

	cpr::Session& Session = m_SessionState->GetSession();

	Session.SetOption(cpr::Url{Uri.c_str()});
	Session.SetHeader(cpr::Header{{"Content-Type", "application/x-ue-cbpkg"}, {"Accept", "application/x-ue-cbpkg"}});
	Session.SetBody(cpr::Body{reinterpret_cast<const char*>(Message.GetData()), Message.GetSize()});

	cpr::Response Response = Session.Post();
	ZEN_DEBUG("POST {}", Response);

	if (Response.error)
	{
		return {.ErrorCode = static_cast<int32_t>(Response.error.code), .Reason = std::move(Response.error.message)};
	}

	const bool	   Success = Response.status_code == 200;
	const IoBuffer Buffer  = Success ? IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size()) : IoBuffer();

	return {.Response		= std::move(Buffer),
			.Bytes			= Response.uploaded_bytes,
			.ElapsedSeconds = Response.elapsed,
			.Reason			= Response.reason,
			.Success		= Success};
}

}  // namespace zen
